// -*- c -*-
//
// %CopyrightBegin%
//
// SPDX-License-Identifier: Apache-2.0
//
// Copyright Ericsson AB 2017-2025. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// %CopyrightEnd%
//

recv_marker_reserve(Dst) {
    SWAPOUT;

    $Dst = erts_msgq_recv_marker_insert(c_p);

    SWAPIN;
}

recv_marker_bind(Marker, Reference) {
    erts_msgq_recv_marker_bind(c_p, $Marker, $Reference);
}

recv_marker_clear(Reference) {
    erts_msgq_recv_marker_clear(c_p, $Reference);
}

recv_marker_use(Reference) {
    erts_msgq_recv_marker_set_save(c_p, $Reference);
}

i_loop_rec(Dest) {
    //| -no_prefetch

    /*
     * Pick up the next message and place it in x(0).
     * If no message, jump to a wait or wait_timeout instruction.
     */

    ErtsMessage* msgp;

    /*
     * We need to disable GC while matching messages
     * in the queue. This since messages with data outside
     * the heap will be corrupted by a GC.
     */
    ASSERT(!(c_p->flags & F_DELAY_GC));
    c_p->flags |= F_DELAY_GC;

    /* Entry point from loop_rec_end (and locally) */
 loop_rec__:

    if (FCALLS <= 0 && FCALLS <= neg_o_reds) {
        $SET_CP_I_ABS(I);
        c_p->flags &= ~F_DELAY_GC;
        SWAPOUT;
        c_p->arity = 0;
        c_p->current = NULL;
        goto do_schedule;
    }

    ASSERT(!ERTS_PROC_IS_EXITING(c_p));

    PROCESS_MAIN_CHK_LOCKS(c_p);

    msgp = erts_msgq_peek_msg(c_p);

    if (ERTS_UNLIKELY(msgp == NULL)) {
        int get_out;
        SWAPOUT;
        $SET_CP_I_ABS(I);
        c_p->arity = 0;
        c_p->current = NULL;
        c_p->fcalls = FCALLS;
        ERTS_UNREQ_PROC_MAIN_LOCK(c_p);
        FCALLS -= erts_proc_sig_receive_helper(c_p, FCALLS, neg_o_reds,
                                               &msgp, &get_out);
        ERTS_REQ_PROC_MAIN_LOCK(c_p);
        SWAPIN;
        if (ERTS_UNLIKELY(msgp == NULL)) {
            if (get_out) {
                if (get_out < 0) {
                    ASSERT(FCALLS <= 0 && FCALLS <= neg_o_reds);
                    goto loop_rec__; /* yield */
                }
                else {
                    ASSERT(ERTS_PROC_IS_EXITING(c_p));
                    goto do_schedule; /* exit */
                }
            }

            /*
             * If there are no more messages in queue
             * (and we are not yielding or exiting)
             * erts_proc_sig_receive_helper()
             * returns with message queue lock locked...
             */
            c_p->flags &= ~F_DELAY_GC;
            $SET_I_REL($Dest);
            Goto(*I);		/* Jump to a wait or wait_timeout instruction */
        }
    }

    ASSERT(msgp == erts_msgq_peek_msg(c_p));
    ASSERT(msgp && ERTS_SIG_IS_MSG(msgp));

    if (ERTS_UNLIKELY(ERTS_SIG_IS_EXTERNAL_MSG(msgp))) {
        FCALLS -= 10; /* FIXME: bump appropriate amount... */
        SWAPOUT; /* erts_proc_sig_decode_dist() may write to heap... */
        if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
            /*
             * A corrupt distribution message that we weren't able to decode;
             * remove it...
             */
            /* No swapin should be needed */
            ASSERT(HTOP == c_p->htop && E == c_p->stop);
            /* TODO: Add DTrace probe for this bad message situation? */
            erts_msgq_unlink_msg(c_p, msgp);
            msgp->next = NULL;
            erts_cleanup_messages(msgp);
            goto loop_rec__;
        }
        SWAPIN;
    }

    ASSERT(msgp == erts_msgq_peek_msg(c_p));
    ASSERT(ERTS_SIG_IS_INTERNAL_MSG(msgp));

    x(0) = ERL_MESSAGE_TERM(msgp);
}

remove_message() {
    //| -no_prefetch

    /*
     * Remove a (matched) message from the message queue.
     */

    ErtsMessage* msgp;
    PROCESS_MAIN_CHK_LOCKS(c_p);

    ERTS_CHK_MBUF_SZ(c_p);

    msgp = erts_msgq_peek_msg(c_p);

    if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
        save_calls(c_p, &exp_receive);
    }
    if (ERL_MESSAGE_TOKEN(msgp) == NIL) {
#ifdef USE_VM_PROBES
        if (DT_UTAG(c_p) != NIL) {
            if (DT_UTAG_FLAGS(c_p) & DT_UTAG_PERMANENT) {
                SEQ_TRACE_TOKEN(c_p) = am_have_dt_utag;
            } else {
                DT_UTAG(c_p) = NIL;
                SEQ_TRACE_TOKEN(c_p) = NIL;
            }
        } else {
#endif
            SEQ_TRACE_TOKEN(c_p) = NIL;
#ifdef USE_VM_PROBES
        }
        DT_UTAG_FLAGS(c_p) &= ~DT_UTAG_SPREADING;
#endif
    } else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) {
        Eterm msg;
        SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp);
#ifdef USE_VM_PROBES
        if (ERL_MESSAGE_TOKEN(msgp) == am_have_dt_utag) {
            if (DT_UTAG(c_p) == NIL) {
                DT_UTAG(c_p) = ERL_MESSAGE_DT_UTAG(msgp);
            }
            DT_UTAG_FLAGS(c_p) |= DT_UTAG_SPREADING;
        } else {
#endif
            ASSERT(is_tuple(SEQ_TRACE_TOKEN(c_p)));
            ASSERT(SEQ_TRACE_TOKEN_ARITY(c_p) == 5);
            ASSERT(is_small(SEQ_TRACE_TOKEN_SERIAL(c_p)));
            ASSERT(is_small(SEQ_TRACE_TOKEN_LASTCNT(c_p)));
            ASSERT(is_small(SEQ_TRACE_TOKEN_FLAGS(c_p)));
            ASSERT(is_pid(SEQ_TRACE_TOKEN_SENDER(c_p))
                   || is_atom(SEQ_TRACE_TOKEN_SENDER(c_p)));
            c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
            if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) {
                c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
            }
            msg = ERL_MESSAGE_TERM(msgp);
            seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE,
                             c_p->common.id, c_p);
#ifdef USE_VM_PROBES
        }
#endif
    }
#ifdef USE_VM_PROBES
    if (DTRACE_ENABLED(message_receive)) {
        Eterm token2 = NIL;
        DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
        Sint tok_label = 0;
        Sint tok_lastcnt = 0;
        Sint tok_serial = 0;

        dtrace_proc_str(c_p, receiver_name);
        token2 = SEQ_TRACE_TOKEN(c_p);
        if (have_seqtrace(token2)) {
            tok_label = SEQ_TRACE_T_DTRACE_LABEL(token2);
            tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token2));
            tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token2));
        }
        DTRACE6(message_receive,
                receiver_name, size_object(ERL_MESSAGE_TERM(msgp)),
                c_p->sig_qs.mq_len,
                tok_label, tok_lastcnt, tok_serial);
    }
#endif
    erts_msgq_unlink_msg_set_save_first(c_p, msgp);
    CANCEL_TIMER(c_p);

    erts_save_message_in_proc(c_p, msgp);
    c_p->flags &= ~F_DELAY_GC;

    if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E, 0)) {
        /*
         * We want to GC soon but we leave a few
         * reductions giving the message some time
         * to turn into garbage.
         */
        ERTS_VBUMP_LEAVE_REDS_INTERNAL(c_p, 5, FCALLS);
    }

    ERTS_DBG_CHK_REDS(c_p, FCALLS);
    ERTS_CHK_MBUF_SZ(c_p);

    ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
    PROCESS_MAIN_CHK_LOCKS(c_p);
}

loop_rec_end(Dest) {
    //| -no_next
    /*
     * Advance the save pointer to the next message (the current
     * message didn't match), then jump to the loop_rec instruction.
     */

    ASSERT(c_p->flags & F_DELAY_GC);

    $SET_I_REL($Dest);
    erts_msgq_set_save_next(c_p);
    FCALLS--;
    goto loop_rec__;
}

timeout_locked() {
    /*
     * A timeout has occurred.  Reset the save pointer so that the next
     * receive statement will examine the first message first.
     */

    erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
    $timeout();
}

timeout() {
    if (ERTS_IS_P_TRACED_FL(c_p, F_TRACE_RECEIVE)) {
        trace_receive(c_p, am_clock_service, am_timeout);
    }
    if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
        save_calls(c_p, &exp_timeout);
    }
    c_p->flags &= ~F_TIMO;
    erts_msgq_set_save_first(c_p);
}

TIMEOUT_VALUE() {
    c_p->freason = EXC_TIMEOUT_VALUE;
    erts_msgq_set_save_first(c_p);
    goto find_func_info;
    //| -no_next
}

i_wait_error_locked() {
    erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
    $TIMEOUT_VALUE();
}

i_wait_error() {
    $TIMEOUT_VALUE();
}

wait_timeout_unlocked_int := wait.lock.int.execute;
wait_timeout_locked_int := wait.int.execute;

wait_timeout_unlocked := wait.lock.src.execute;
wait_timeout_locked := wait.src.execute;

wait_unlocked := wait.lock.execute;
wait_locked := wait.unlocked.execute;

wait.lock() {
    erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
}

wait.unlocked() {
}

wait.int(Int) {
    /*
     * If we have already set the timer, we must NOT set it again.  Therefore,
     * we must test the F_INSLPQUEUE flag as well as the F_TIMO flag.
     */
    if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) {
        const BeamInstr** pi = (const BeamInstr **) c_p->def_arg_reg;
        *pi = $NEXT_INSTRUCTION;
        erts_set_proc_timer_uword(c_p, $Int);
    }
}

wait.src(Src) {
    /*
     * If we have already set the timer, we must NOT set it again.  Therefore,
     * we must test the F_INSLPQUEUE flag as well as the F_TIMO flag.
     */
    if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) {
        Eterm timeout_value = $Src;
        if (timeout_value == make_small(0)) {
            erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
            $NEXT0();
        } else if (timeout_value == am_infinity) {
            c_p->flags |= F_TIMO;
        } else {
            int tres = erts_set_proc_timer_term(c_p, timeout_value);
            if (tres == 0) {
                /*
                 * The timer routiner will set c_p->i to the value in
                 * c_p->def_arg_reg[0].  Note that it is safe to use this
                 * location because there are no living x registers in
                 * a receive statement.
                 */
                const BeamInstr** pi = (const BeamInstr **) c_p->def_arg_reg;
                *pi = $NEXT_INSTRUCTION;
            } else { /* Wrong time */
                erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
                c_p->freason = EXC_TIMEOUT_VALUE;
                erts_msgq_set_save_first(c_p);
                goto find_func_info;
            }
        }
    }
}

//
// Prepare to wait indefinitely for a new message to arrive
// (or the time set above if falling through from above).
// When a new message arrives, control will be transferred
// the loop_rec instruction (at label L1).  In case of
// of timeout, control will be transferred to the timeout
// instruction following the wait_timeout instruction.
//

wait.execute(JumpTarget) {
    $SET_REL_I(c_p->i, $JumpTarget); /* L1 */
    SWAPOUT;
    c_p->arity = 0;

    if (!ERTS_PTMR_IS_TIMED_OUT(c_p)) {
        erts_atomic32_read_band_relb(&c_p->state,
                                         ~ERTS_PSFLG_ACTIVE);
    }
    ASSERT(!ERTS_PROC_IS_EXITING(c_p));
    erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
    c_p->current = NULL;
    goto do_schedule;
    //| -no_next
}
